package com.kafka.cn.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * @author: yangShen
 * @Description:
 * @Date: 2020/3/31 15:20
 */
public class MyConsumer {
    public static void main(String[] args) {
        //1.创建消费者的信息
        Properties properties = new Properties();

        //2.给配置信息赋值
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.16.26.16:9092");

        //开启自动提交
        //开启自动提交后，消费者组每消费一次后都会将消费组的消费记录存起来(消费的offset)，下一次消费时会拿上一次的offset为起点继续消费，就是说前面的数据不能消费(读)了，重启服务器也是如此
        //可以设置手动提交
        //此处涉及到下面的配置 --> //重置消费者的offset
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //关闭自动提交
        //此时消费者组获取存储的offset，在offset往后的数据继续消费，消费后offset不会更新，再次重启服务后，仍是offset->目前最高水位
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        //自动提交的延时
        //消费者消费offset到10了，会把10存到一个地方，此处设置提交的延时时间1000毫秒
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        //key,value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata1");

        //重置消费者的offset
        //生效条件：
        // 1.当前消费者组第一次消费时(消费一次后重启服务再次消费，也算第二次消费，会保存消费的提交记录--ENABLE_AUTO_COMMIT_CONFIG=true)，(尝试换个消费者组的名字)
        // 2.之前消费者消费的offset=10,7天已经过去，现在offset<=10的数据都会被删掉，此时最小的offset=1000
        //      现在消费者再次拿着offset=10时生效
        //补充：当前消费者启动时，只去zookeeper或本地读取一次offset，再次读取时会把offset+10的值保存在内存中，第三次消费会去内存中读取offset+10的值而不是本地或zookeeper(速度快)
        //(默认)最早的
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");


        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //订阅主题: 可以订阅不存在的主题，不影响其他主题的数据获取
        consumer.subscribe(Arrays.asList("first","third"));




        //消费者一旦开启就不要关了，除非手动关掉，kill 掉
        while (true){
            //获取数据,消费者采取拉取的方式获取数据，一直要维护长轮询，如果没有数据会浪费资源，如果发现没拉取到数据 会根据设置的延迟时间100毫秒断开长轮询
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            //解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                //提交发送数据的时候可以选择发送key, 但是Linux控制台连接消费者只能看到value, 此处consumer提供查看key的方法
                System.out.println(consumerRecord.key() + "----" + consumerRecord.value()+"----"+consumerRecord.offset());
            }

            //关闭自动提交
            //1.同步提交(不推荐)，当前线程会阻塞知道offset提交完成;同步提交完成后再去重新拉去数据
            //consumer.commitSync();

            //2.异步提交(推荐)
            //当代码走到这里时，进行异步提交的同时，消费者再次去拉去数据
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (null != exception){
                        System.out.println("Commit failed for "+offsets);
                    }
                }
            });

            //1和2总结：同步和异步提交都存在数据重复问题
            //原因：如果消费者在读取数据的时候将数据写入mysql类似数据库，此时offset的提交失败了(offset未更新)，当消费者再次消费读取数据时，
                //会从上次读取数据时的offset进行读，此时已经有部分或全部数据写入到了数据库，所以会重复

            //3.自定义提交
        }



    }


}
